/**
 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jivesoftware.smackx.bytestreams.ibb;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.jivesoftware.smack.Connection;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.PacketExtension;
import org.jivesoftware.smack.packet.XMPPError;
import org.jivesoftware.smack.util.StringUtils;
import org.jivesoftware.smack.util.SyncPacketSend;
import org.jivesoftware.smackx.bytestreams.BytestreamSession;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;

/**
 * InBandBytestreamSession class represents an In-Band Bytestream session.
 * <p>
 * In-band bytestreams are bidirectional and this session encapsulates the
 * streams for both directions.
 * <p>
 * Note that closing the In-Band Bytestream session will close both streams. If
 * both streams are closed individually the session will be closed automatically
 * once the second stream is closed. Use the
 * {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be
 * closed automatically if one of them is closed.
 * 
 * @author Henning Staib
 */
public class InBandBytestreamSession implements BytestreamSession {

	/* XMPP connection */
	private final Connection connection;

	/* the In-Band Bytestream open request for this session */
	private final Open byteStreamRequest;

	/*
	 * the input stream for this session (either IQIBBInputStream or
	 * MessageIBBInputStream)
	 */
	private IBBInputStream inputStream;

	/*
	 * the output stream for this session (either IQIBBOutputStream or
	 * MessageIBBOutputStream)
	 */
	private IBBOutputStream outputStream;

	/* JID of the remote peer */
	private String remoteJID;

	/* flag to close both streams if one of them is closed */
	private boolean closeBothStreamsEnabled = false;

	/* flag to indicate if session is closed */
	private boolean isClosed = false;

	/**
	 * Constructor.
	 * 
	 * @param connection
	 *            the XMPP connection
	 * @param byteStreamRequest
	 *            the In-Band Bytestream open request for this session
	 * @param remoteJID
	 *            JID of the remote peer
	 */
	protected InBandBytestreamSession(Connection connection,
			Open byteStreamRequest, String remoteJID) {
		this.connection = connection;
		this.byteStreamRequest = byteStreamRequest;
		this.remoteJID = remoteJID;

		// initialize streams dependent to the uses stanza type
		switch (byteStreamRequest.getStanza()) {
		case IQ:
			this.inputStream = new IQIBBInputStream();
			this.outputStream = new IQIBBOutputStream();
			break;
		case MESSAGE:
			this.inputStream = new MessageIBBInputStream();
			this.outputStream = new MessageIBBOutputStream();
			break;
		}

	}

	public InputStream getInputStream() {
		return this.inputStream;
	}

	public OutputStream getOutputStream() {
		return this.outputStream;
	}

	public int getReadTimeout() {
		return this.inputStream.readTimeout;
	}

	public void setReadTimeout(int timeout) {
		if (timeout < 0) {
			throw new IllegalArgumentException("Timeout must be >= 0");
		}
		this.inputStream.readTimeout = timeout;
	}

	/**
	 * Returns whether both streams should be closed automatically if one of the
	 * streams is closed. Default is <code>false</code>.
	 * 
	 * @return <code>true</code> if both streams will be closed if one of the
	 *         streams is closed, <code>false</code> if both streams can be
	 *         closed independently.
	 */
	public boolean isCloseBothStreamsEnabled() {
		return closeBothStreamsEnabled;
	}

	/**
	 * Sets whether both streams should be closed automatically if one of the
	 * streams is closed. Default is <code>false</code>.
	 * 
	 * @param closeBothStreamsEnabled
	 *            <code>true</code> if both streams should be closed if one of
	 *            the streams is closed, <code>false</code> if both streams
	 *            should be closed independently
	 */
	public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
		this.closeBothStreamsEnabled = closeBothStreamsEnabled;
	}

	public void close() throws IOException {
		closeByLocal(true); // close input stream
		closeByLocal(false); // close output stream
	}

	/**
	 * This method is invoked if a request to close the In-Band Bytestream has
	 * been received.
	 * 
	 * @param closeRequest
	 *            the close request from the remote peer
	 */
	protected void closeByPeer(Close closeRequest) {

		/*
		 * close streams without flushing them, because stream is already
		 * considered closed on the remote peers side
		 */
		this.inputStream.closeInternal();
		this.inputStream.cleanup();
		this.outputStream.closeInternal(false);

		// acknowledge close request
		IQ confirmClose = IQ.createResultIQ(closeRequest);
		this.connection.sendPacket(confirmClose);

	}

	/**
	 * This method is invoked if one of the streams has been closed locally, if
	 * an error occurred locally or if the whole session should be closed.
	 * 
	 * @throws IOException
	 *             if an error occurs while sending the close request
	 */
	protected synchronized void closeByLocal(boolean in) throws IOException {
		if (this.isClosed) {
			return;
		}

		if (this.closeBothStreamsEnabled) {
			this.inputStream.closeInternal();
			this.outputStream.closeInternal(true);
		} else {
			if (in) {
				this.inputStream.closeInternal();
			} else {
				// close stream but try to send any data left
				this.outputStream.closeInternal(true);
			}
		}

		if (this.inputStream.isClosed && this.outputStream.isClosed) {
			this.isClosed = true;

			// send close request
			Close close = new Close(this.byteStreamRequest.getSessionID());
			close.setTo(this.remoteJID);
			try {
				SyncPacketSend.getReply(this.connection, close);
			} catch (XMPPException e) {
				throw new IOException("Error while closing stream: "
						+ e.getMessage());
			}

			this.inputStream.cleanup();

			// remove session from manager
			InBandBytestreamManager.getByteStreamManager(this.connection)
					.getSessions().remove(this);
		}

	}

	/**
	 * IBBInputStream class is the base implementation of an In-Band Bytestream
	 * input stream. Subclasses of this input stream must provide a packet
	 * listener along with a packet filter to collect the In-Band Bytestream
	 * data packets.
	 */
	private abstract class IBBInputStream extends InputStream {

		/* the data packet listener to fill the data queue */
		private final PacketListener dataPacketListener;

		/* queue containing received In-Band Bytestream data packets */
		protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();

		/* buffer containing the data from one data packet */
		private byte[] buffer;

		/* pointer to the next byte to read from buffer */
		private int bufferPointer = -1;

		/* data packet sequence (range from 0 to 65535) */
		private long seq = -1;

		/* flag to indicate if input stream is closed */
		private boolean isClosed = false;

		/* flag to indicate if close method was invoked */
		private boolean closeInvoked = false;

		/* timeout for read operations */
		private int readTimeout = 0;

		/**
		 * Constructor.
		 */
		public IBBInputStream() {
			// add data packet listener to connection
			this.dataPacketListener = getDataPacketListener();
			connection.addPacketListener(this.dataPacketListener,
					getDataPacketFilter());
		}

		/**
		 * Returns the packet listener that processes In-Band Bytestream data
		 * packets.
		 * 
		 * @return the data packet listener
		 */
		protected abstract PacketListener getDataPacketListener();

		/**
		 * Returns the packet filter that accepts In-Band Bytestream data
		 * packets.
		 * 
		 * @return the data packet filter
		 */
		protected abstract PacketFilter getDataPacketFilter();

		public synchronized int read() throws IOException {
			checkClosed();

			// if nothing read yet or whole buffer has been read fill buffer
			if (bufferPointer == -1 || bufferPointer >= buffer.length) {
				// if no data available and stream was closed return -1
				if (!loadBuffer()) {
					return -1;
				}
			}

			// return byte and increment buffer pointer
			return ((int) buffer[bufferPointer++]) & 0xff;
		}

		public synchronized int read(byte[] b, int off, int len)
				throws IOException {
			if (b == null) {
				throw new NullPointerException();
			} else if ((off < 0) || (off > b.length) || (len < 0)
					|| ((off + len) > b.length) || ((off + len) < 0)) {
				throw new IndexOutOfBoundsException();
			} else if (len == 0) {
				return 0;
			}

			checkClosed();

			// if nothing read yet or whole buffer has been read fill buffer
			if (bufferPointer == -1 || bufferPointer >= buffer.length) {
				// if no data available and stream was closed return -1
				if (!loadBuffer()) {
					return -1;
				}
			}

			// if more bytes wanted than available return all available
			int bytesAvailable = buffer.length - bufferPointer;
			if (len > bytesAvailable) {
				len = bytesAvailable;
			}

			System.arraycopy(buffer, bufferPointer, b, off, len);
			bufferPointer += len;
			return len;
		}

		public synchronized int read(byte[] b) throws IOException {
			return read(b, 0, b.length);
		}

		/**
		 * This method blocks until a data packet is received, the stream is
		 * closed or the current thread is interrupted.
		 * 
		 * @return <code>true</code> if data was received, otherwise
		 *         <code>false</code>
		 * @throws IOException
		 *             if data packets are out of sequence
		 */
		private synchronized boolean loadBuffer() throws IOException {

			// wait until data is available or stream is closed
			DataPacketExtension data = null;
			try {
				if (this.readTimeout == 0) {
					while (data == null) {
						if (isClosed && this.dataQueue.isEmpty()) {
							return false;
						}
						data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
					}
				} else {
					data = this.dataQueue.poll(this.readTimeout,
							TimeUnit.MILLISECONDS);
					if (data == null) {
						throw new SocketTimeoutException();
					}
				}
			} catch (InterruptedException e) {
				// Restore the interrupted status
				Thread.currentThread().interrupt();
				return false;
			}

			// handle sequence overflow
			if (this.seq == 65535) {
				this.seq = -1;
			}

			// check if data packets sequence is successor of last seen sequence
			long seq = data.getSeq();
			if (seq - 1 != this.seq) {
				// packets out of order; close stream/session
				InBandBytestreamSession.this.close();
				throw new IOException("Packets out of sequence");
			} else {
				this.seq = seq;
			}

			// set buffer to decoded data
			buffer = data.getDecodedData();
			bufferPointer = 0;
			return true;
		}

		/**
		 * Checks if this stream is closed and throws an IOException if
		 * necessary
		 * 
		 * @throws IOException
		 *             if stream is closed and no data should be read anymore
		 */
		private void checkClosed() throws IOException {
			/*
			 * throw no exception if there is data available, but not if close
			 * method was invoked
			 */
			if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
				// clear data queue in case additional data was received after
				// stream was closed
				this.dataQueue.clear();
				throw new IOException("Stream is closed");
			}
		}

		public boolean markSupported() {
			return false;
		}

		public void close() throws IOException {
			if (isClosed) {
				return;
			}

			this.closeInvoked = true;

			InBandBytestreamSession.this.closeByLocal(true);
		}

		/**
		 * This method sets the close flag and removes the data packet listener.
		 */
		private void closeInternal() {
			if (isClosed) {
				return;
			}
			isClosed = true;
		}

		/**
		 * Invoked if the session is closed.
		 */
		private void cleanup() {
			connection.removePacketListener(this.dataPacketListener);
		}

	}

	/**
	 * IQIBBInputStream class implements IBBInputStream to be used with IQ
	 * stanzas encapsulating the data packets.
	 */
	private class IQIBBInputStream extends IBBInputStream {

		protected PacketListener getDataPacketListener() {
			return new PacketListener() {

				private long lastSequence = -1;

				public void processPacket(Packet packet) {
					// get data packet extension
					DataPacketExtension data = (DataPacketExtension) packet
							.getExtension(DataPacketExtension.ELEMENT_NAME,
									InBandBytestreamManager.NAMESPACE);

					/*
					 * check if sequence was not used already (see XEP-0047
					 * Section 2.2)
					 */
					if (data.getSeq() <= this.lastSequence) {
						IQ unexpectedRequest = IQ
								.createErrorResponse(
										(IQ) packet,
										new XMPPError(
												XMPPError.Condition.unexpected_request));
						connection.sendPacket(unexpectedRequest);
						return;

					}

					// check if encoded data is valid (see XEP-0047 Section 2.2)
					if (data.getDecodedData() == null) {
						// data is invalid; respond with bad-request error
						IQ badRequest = IQ.createErrorResponse((IQ) packet,
								new XMPPError(XMPPError.Condition.bad_request));
						connection.sendPacket(badRequest);
						return;
					}

					// data is valid; add to data queue
					dataQueue.offer(data);

					// confirm IQ
					IQ confirmData = IQ.createResultIQ((IQ) packet);
					connection.sendPacket(confirmData);

					// set last seen sequence
					this.lastSequence = data.getSeq();
					if (this.lastSequence == 65535) {
						this.lastSequence = -1;
					}

				}

			};
		}

		protected PacketFilter getDataPacketFilter() {
			/*
			 * filter all IQ stanzas having type 'SET' (represented by Data
			 * class), containing a data packet extension, matching session ID
			 * and recipient
			 */
			return new AndFilter(new PacketTypeFilter(Data.class),
					new IBBDataPacketFilter());
		}

	}

	/**
	 * MessageIBBInputStream class implements IBBInputStream to be used with
	 * message stanzas encapsulating the data packets.
	 */
	private class MessageIBBInputStream extends IBBInputStream {

		protected PacketListener getDataPacketListener() {
			return new PacketListener() {

				public void processPacket(Packet packet) {
					// get data packet extension
					DataPacketExtension data = (DataPacketExtension) packet
							.getExtension(DataPacketExtension.ELEMENT_NAME,
									InBandBytestreamManager.NAMESPACE);

					// check if encoded data is valid
					if (data.getDecodedData() == null) {
						/*
						 * TODO once a majority of XMPP server implementation
						 * support XEP-0079 Advanced Message Processing the
						 * invalid message could be answered with an appropriate
						 * error. For now we just ignore the packet. Subsequent
						 * packets with an increased sequence will cause the
						 * input stream to close the stream/session.
						 */
						return;
					}

					// data is valid; add to data queue
					dataQueue.offer(data);

					// TODO confirm packet once XMPP servers support XEP-0079
				}

			};
		}

		@Override
		protected PacketFilter getDataPacketFilter() {
			/*
			 * filter all message stanzas containing a data packet extension,
			 * matching session ID and recipient
			 */
			return new AndFilter(new PacketTypeFilter(Message.class),
					new IBBDataPacketFilter());
		}

	}

	/**
	 * IBBDataPacketFilter class filters all packets from the remote peer of
	 * this session, containing an In-Band Bytestream data packet extension
	 * whose session ID matches this sessions ID.
	 */
	private class IBBDataPacketFilter implements PacketFilter {

		public boolean accept(Packet packet) {
			// sender equals remote peer
			if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
				return false;
			}

			// stanza contains data packet extension
			PacketExtension packetExtension = packet.getExtension(
					DataPacketExtension.ELEMENT_NAME,
					InBandBytestreamManager.NAMESPACE);
			if (packetExtension == null
					|| !(packetExtension instanceof DataPacketExtension)) {
				return false;
			}

			// session ID equals this session ID
			DataPacketExtension data = (DataPacketExtension) packetExtension;
			if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
				return false;
			}

			return true;
		}

	}

	/**
	 * IBBOutputStream class is the base implementation of an In-Band Bytestream
	 * output stream. Subclasses of this output stream must provide a method to
	 * send data over XMPP stream.
	 */
	private abstract class IBBOutputStream extends OutputStream {

		/* buffer with the size of this sessions block size */
		protected final byte[] buffer;

		/* pointer to next byte to write to buffer */
		protected int bufferPointer = 0;

		/* data packet sequence (range from 0 to 65535) */
		protected long seq = 0;

		/* flag to indicate if output stream is closed */
		protected boolean isClosed = false;

		/**
		 * Constructor.
		 */
		public IBBOutputStream() {
			this.buffer = new byte[(byteStreamRequest.getBlockSize() / 4) * 3];
		}

		/**
		 * Writes the given data packet to the XMPP stream.
		 * 
		 * @param data
		 *            the data packet
		 * @throws IOException
		 *             if an I/O error occurred while sending or if the stream
		 *             is closed
		 */
		protected abstract void writeToXML(DataPacketExtension data)
				throws IOException;

		public synchronized void write(int b) throws IOException {
			if (this.isClosed) {
				throw new IOException("Stream is closed");
			}

			// if buffer is full flush buffer
			if (bufferPointer >= buffer.length) {
				flushBuffer();
			}

			buffer[bufferPointer++] = (byte) b;
		}

		public synchronized void write(byte b[], int off, int len)
				throws IOException {
			if (b == null) {
				throw new NullPointerException();
			} else if ((off < 0) || (off > b.length) || (len < 0)
					|| ((off + len) > b.length) || ((off + len) < 0)) {
				throw new IndexOutOfBoundsException();
			} else if (len == 0) {
				return;
			}

			if (this.isClosed) {
				throw new IOException("Stream is closed");
			}

			// is data to send greater than buffer size
			if (len >= buffer.length) {

				// "byte" off the first chunk to write out
				writeOut(b, off, buffer.length);

				// recursively call this method with the lesser amount
				write(b, off + buffer.length, len - buffer.length);
			} else {
				writeOut(b, off, len);
			}
		}

		public synchronized void write(byte[] b) throws IOException {
			write(b, 0, b.length);
		}

		/**
		 * Fills the buffer with the given data and sends it over the XMPP
		 * stream if the buffers capacity has been reached. This method is only
		 * called from this class so it is assured that the amount of data to
		 * send is <= buffer capacity
		 * 
		 * @param b
		 *            the data
		 * @param off
		 *            the data
		 * @param len
		 *            the number of bytes to write
		 * @throws IOException
		 *             if an I/O error occurred while sending or if the stream
		 *             is closed
		 */
		private synchronized void writeOut(byte b[], int off, int len)
				throws IOException {
			if (this.isClosed) {
				throw new IOException("Stream is closed");
			}

			// set to 0 in case the next 'if' block is not executed
			int available = 0;

			// is data to send greater that buffer space left
			if (len > buffer.length - bufferPointer) {
				// fill buffer to capacity and send it
				available = buffer.length - bufferPointer;
				System.arraycopy(b, off, buffer, bufferPointer, available);
				bufferPointer += available;
				flushBuffer();
			}

			// copy the data left to buffer
			System.arraycopy(b, off + available, buffer, bufferPointer, len
					- available);
			bufferPointer += len - available;
		}

		public synchronized void flush() throws IOException {
			if (this.isClosed) {
				throw new IOException("Stream is closed");
			}
			flushBuffer();
		}

		private synchronized void flushBuffer() throws IOException {

			// do nothing if no data to send available
			if (bufferPointer == 0) {
				return;
			}

			// create data packet
			String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer,
					false);
			DataPacketExtension data = new DataPacketExtension(
					byteStreamRequest.getSessionID(), this.seq, enc);

			// write to XMPP stream
			writeToXML(data);

			// reset buffer pointer
			bufferPointer = 0;

			// increment sequence, considering sequence overflow
			this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);

		}

		public void close() throws IOException {
			if (isClosed) {
				return;
			}
			InBandBytestreamSession.this.closeByLocal(false);
		}

		/**
		 * Sets the close flag and optionally flushes the stream.
		 * 
		 * @param flush
		 *            if <code>true</code> flushes the stream
		 */
		protected void closeInternal(boolean flush) {
			if (this.isClosed) {
				return;
			}
			this.isClosed = true;

			try {
				if (flush) {
					flushBuffer();
				}
			} catch (IOException e) {
				/*
				 * ignore, because writeToXML() will not throw an exception if
				 * stream is already closed
				 */
			}
		}

	}

	/**
	 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ
	 * stanzas encapsulating the data packets.
	 */
	private class IQIBBOutputStream extends IBBOutputStream {

		@Override
		protected synchronized void writeToXML(DataPacketExtension data)
				throws IOException {
			// create IQ stanza containing data packet
			IQ iq = new Data(data);
			iq.setTo(remoteJID);

			try {
				SyncPacketSend.getReply(connection, iq);
			} catch (XMPPException e) {
				// close session unless it is already closed
				if (!this.isClosed) {
					InBandBytestreamSession.this.close();
					throw new IOException("Error while sending Data: "
							+ e.getMessage());
				}
			}

		}

	}

	/**
	 * MessageIBBOutputStream class implements IBBOutputStream to be used with
	 * message stanzas encapsulating the data packets.
	 */
	private class MessageIBBOutputStream extends IBBOutputStream {

		@Override
		protected synchronized void writeToXML(DataPacketExtension data) {
			// create message stanza containing data packet
			Message message = new Message(remoteJID);
			message.addExtension(data);

			connection.sendPacket(message);

		}

	}

}
